package main

import (
	"context"
	"fmt"
	"github.com/IBM/sarama"
	"log"
	"strings"
	"time"
)

func main() {
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my-topic123", "my-topic456"} // TODO 测试的topic...
	groupName := "g1"                                // TODO 测试的group...

	config := sarama.NewConfig()
	//config.Version = sarama.V2_6_0_0 // Use Kafka version 2.6.0.0
	config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}

	consumer, err := sarama.NewConsumerGroup(brokers, groupName, config)
	if err != nil {
		log.Fatalln("Error creating consumer group:", err)
	}
	defer consumer.Close()

	ctx := context.Background()

	for {
		err1 := consumer.Consume(ctx, topics, &ConsumerGroupHandler{})
		fmt.Println("err1>>>>>>>> ", err1)
		// Notice 网络异常..... 注意这里跟 消费单独分区ConsumerPartition方法返回的error不一样
		// Notice 可以参考ConsumerPartition的处理方式～字符串包含～
		if err1 != nil {
			// TODO 实际中可能会有更多重试的情况～～
			if strings.Contains(err1.Error(), "client has run out of available brokers") ||
				strings.Contains(err1.Error(), "kafka: broker for ID is not found") ||
				strings.Contains(err1.Error(), "EOF") {

				log.Println("Network error, reconnecting...!!!!!!!!!!!!!!!!!!!!")
				time.Sleep(5 * time.Second) // Wait before reconnecting
				continue
			} else {
				// TODO 其他的异常...
				if err != nil {
					log.Println("消费出问题了!!!", err1)
				}
			}
		}
	}

	//log.Println("Shutting down the consumer...")

}

type ConsumerGroupHandler struct{}

func (h *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
	// Initialization before consuming starts
	return nil
}

func (h *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
	// Clean up resources after consuming ends
	return nil
}

func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		fmt.Printf("Message claimed: value = %s, topic = %s, partition = %d, offset = %d\n", message.Value, message.Topic, message.Partition, message.Offset)
		session.MarkMessage(message, "")
	}
	return nil
}
